package logMonitor.bolt;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import logMonitor.domain.Message;
import logMonitor.utils.MonitorHandler;

import java.util.Map;

/**
 * Created by lenovo on 2017/12/28.
 */
public class MessageFilterBolt extends BaseBasicBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //本地：===================================
        //获取KafkaSpout发送出来的数据
//        String line = input.getString(0);
        //集群：===================================
        //获取kafka发送的数据，是一个byte数组
        byte[] value = (byte[]) input.getValue(0);
        //将数组转化成字符串
        String line = new String(value);
        //<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        //对数据进行解析
        // appid   content
        //1  error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao
        Message message = MonitorHandler.parser(line);
        if (message == null) {
            return;
        }
        collector.emit(new Values(message));
        //定时更新规则信息
        MonitorHandler.scheduleLoad();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}
